package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class GroupTop3 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("GroupTop3").setMaster("local");
        final JavaSparkContext sc = new JavaSparkContext(conf);

        String path = "E:\\BaiduNetdiskDownload\\Spark从入门到精通（Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端）\\第40讲-Spark核心编程：高级编程之topn\\文档\\score.txt";
        final JavaRDD<String> lines = sc.textFile(path);
        JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String line) throws Exception {
                String[] lineSplited = line.split(" ");
                return new Tuple2<String, Integer>(lineSplited[0], Integer.valueOf(lineSplited[1]));
            }
        });

        JavaPairRDD<String, Iterable<Integer>> groupedClassScores = pairs.groupByKey();

        final JavaPairRDD<String, Iterable<Integer>> top3ClassScores = groupedClassScores.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
            @Override
            public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> t) throws Exception {
                String className = t._1;
                Iterator<Integer> iterator = t._2.iterator();
                Integer[] top3Scores = new Integer[3];
                while (iterator.hasNext()) {
                    Integer score = iterator.next();
                    for (int i = 0; i < 3; i++) {
                        if (top3Scores[i] == null) {
                            top3Scores[i] = score;
                            break;
                        } else if (score > top3Scores[i]) {
                            for(int j = 2; j > i; j--) {
                                top3Scores[j] = top3Scores[j -1];
                            }
                            top3Scores[i] = score;
//                            Integer tmp = top3Scores[i];
//                            top3Scores[i] = score;
//                            if(i < top3Scores.length -1) {
//                                top3Scores[i + 1] = tmp;
//                            }
                            break;
                        }
                    }
                }
                return new Tuple2<String, Iterable<Integer>>(className, Arrays.asList(top3Scores));
            }
        });

        top3ClassScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
                System.out.println("班级：" + t._1);
                Iterator<Integer> iterator = t._2.iterator();
                while (iterator.hasNext()) {
                    System.out.println("成绩：" + iterator.next());
                }
                System.out.println("=====================================");
            }
        });

        sc.close();
    }
}
